/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hudi.io;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**
 * A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches).
 *
 * <p>For the first mini-batch, it initializes and sets up the next file path to write,
 * then closes the file writer. The subsequent mini-batches are appended to a file with new name,
 * the new file would then rename to this file name,
 * behaves like each mini-batch data are appended to the same file.
 *
 * @see FlinkMergeAndReplaceHandle
 */
public class FlinkCreateHandle<T, I, K, O>
    extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {

  private static final Logger LOG = LoggerFactory.getLogger(FlinkCreateHandle.class);

  private boolean isClosed = false;

  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                           String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
    this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
        taskContextSupplier);
  }

  public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                           String partitionPath, String fileId, Option<Schema> schemaOption,
                           TaskContextSupplier taskContextSupplier) {
    super(config, instantTime, hoodieTable, partitionPath, fileId, schemaOption,
        taskContextSupplier);
    // delete invalid data files generated by task retry.
    if (getAttemptId() > 0) {
      deleteInvalidDataFile(getAttemptId() - 1);
    }
  }

  /**
   * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
   * (thus the fs view got the written data files some of which may be invalid),
   * it goes on with the next round checkpoint(B) write immediately,
   * if it tries to reuse the last small data bucket(small file) of an invalid data file,
   * finally, when the coordinator receives the checkpoint success event of checkpoint(A),
   * the invalid data file would be cleaned,
   * and this merger got a FileNotFoundException when it close the write file handle.
   *
   * <p> To solve, deletes the invalid data file eagerly
   * so that the invalid file small bucket would never be reused.
   *
   * @param lastAttemptId The last attempt ID
   */
  private void deleteInvalidDataFile(long lastAttemptId) {
    final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId);
    final String lastDataFileName = FSUtils.makeBaseFileName(instantTime,
        lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension());
    final StoragePath path = makeNewFilePath(partitionPath, lastDataFileName);
    try {
      if (storage.exists(path)) {
        LOG.info("Deleting invalid INSERT file due to task retry: " + lastDataFileName);
        storage.deleteFile(path);
      }
    } catch (IOException e) {
      throw new HoodieException("Error while deleting the INSERT file due to task retry: " + lastDataFileName, e);
    }
  }

  @Override
  protected void createMarkerFile(String partitionPath, String dataFileName) {
    WriteMarkers writeMarkers = WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
    writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType());
  }

  @Override
  public StoragePath makeNewPath(String partitionPath) {
    StoragePath path = super.makeNewPath(partitionPath);
    // If the data file already exists, it means the write task write new data bucket multiple times
    // in one hoodie commit, rolls over to a new name instead.

    // Write to a new file which behaves like a different task write.
    try {
      int rollNumber = 0;
      while (storage.exists(path)) {
        StoragePath existing = path;
        path = newFilePathWithRollover(rollNumber++);
        LOG.warn("Duplicate write for INSERT bucket with path: " + existing + ", rolls over to new path: " + path);
      }
      return path;
    } catch (IOException e) {
      throw new HoodieException("Checking existing path for create handle error: " + path, e);
    }
  }

  @Override
  public boolean canWrite(HoodieRecord record) {
    return true;
  }

  /**
   * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
   */
  private StoragePath newFilePathWithRollover(int rollNumber) {
    final String dataFileName = FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber, fileId,
        hoodieTable.getBaseFileExtension());
    return makeNewFilePath(partitionPath, dataFileName);
  }

  @Override
  public List<WriteStatus> close() {
    try {
      return super.close();
    } finally {
      this.isClosed = true;
    }
  }

  @Override
  public void closeGracefully() {
    if (isClosed) {
      return;
    }
    try {
      close();
    } catch (Throwable throwable) {
      LOG.warn("Error while trying to dispose the CREATE handle", throwable);
      try {
        storage.deleteFile(path);
        LOG.info("Deleting the intermediate CREATE data file: " + path + " success!");
      } catch (IOException e) {
        // logging a warning and ignore the exception.
        LOG.warn("Deleting the intermediate CREATE data file: " + path + " failed", e);
      }
    }
  }

  @Override
  public StoragePath getWritePath() {
    return path;
  }
}
